#!/usr/bin/env python3

# Copyright (c) 2023, Arm Limited and Contributors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging  # noqa: F401
from functools import partial
from dataclasses import dataclass
import os
import threading
import pytest
import socket
import time
from pw_hdlc.rpc import HdlcRpcClient, default_channels

from pyedmgr import (  # noqa: F401
    AbstractChannel,
    SynchronousSocketChannel,
    TestCaseContext,
    TestDevice,
    fixture_test_case,  # noqa: F811
    logger,  # noqa: F811
)

# --- IOT socket constants

IOT_SOCKET_AF_INET_UNSPECIFIED = 0
IOT_SOCKET_AF_INET = 1
IOT_SOCKET_AF_INET6 = 2

IOT_SOCKET_SOCK_UNSPECIFIED = 0
IOT_SOCKET_SOCK_STREAM = 1
IOT_SOCKET_SOCK_DGRAM = 2


IOT_SOCKET_IPPROTO_UNSPECIFIED = 0
IOT_SOCKET_IPPROTO_TCP = 1
IOT_SOCKET_IPPROTO_UDP = 2


IOT_SOCKET_LEVEL_SOL_SOCKET = 1
IOT_SOCKET_LEVEL_IPPROTO_IP = 2
IOT_SOCKET_LEVEL_IPPROTO_TCP = 3
IOT_SOCKET_LEVEL_IPPROTO_IPV6 = 4

IOT_SOCKET_IO_FIONBIO = 1
IOT_SOCKET_SO_RCVTIMEO = 2
IOT_SOCKET_SO_SNDTIMEO = 3
IOT_SOCKET_SO_KEEPALIVE = 4
IOT_SOCKET_SO_TYPE = 5
IOT_SOCKET_SO_REUSEADDR = 6
IOT_SOCKET_SO_BINDTODEVICE = 7
IOT_SOCKET_SO_LINGER = 8
IOT_SOCKET_SO_BROADCAST = 9
IOT_SOCKET_IP_MULTICAST_IF = 10
IOT_SOCKET_IP_MULTICAST_TTL = 11
IOT_SOCKET_IP_MULTICAST_LOOP = 12
IOT_SOCKET_IP_PKTINFO = 13
IOT_SOCKET_IP_ADD_MEMBERSHIP = 14
IOT_SOCKET_IP_DROP_MEMBERSHIP = 15
IOT_SOCKET_IPV6_V6ONLY = 16
IOT_SOCKET_IPV6_PKTINFO = 17
IOT_SOCKET_IPV6_MULTICAST_IF = 18
IOT_SOCKET_IPV6_MULTICAST_HOPS = 19
IOT_SOCKET_IPV6_MULTICAST_LOOP = 20
IOT_SOCKET_IPV6_ADD_MEMBERSHIP = 21
IOT_SOCKET_IPV6_DROP_MEMBERSHIP = 22
IOT_SOCKET_TCP_NODELAY = 23
IOT_SOCKET_TCP_KEEPIDLE = 24
IOT_SOCKET_TCP_KEEPINTVL = 25
IOT_SOCKET_TCP_KEEPCNT = 26

IOT_SOCKET_ERROR = -1
IOT_SOCKET_ESOCK = -2
IOT_SOCKET_EINVAL = -3
IOT_SOCKET_ENOTSUP = -4
IOT_SOCKET_ENOMEM = -5
IOT_SOCKET_EAGAIN = -6
IOT_SOCKET_EINPROGRESS = -7
IOT_SOCKET_ETIMEDOUT = -8
IOT_SOCKET_EISCONN = -9
IOT_SOCKET_ENOTCONN = -10
IOT_SOCKET_ECONNREFUSED = -11
IOT_SOCKET_ECONNRESET = -12
IOT_SOCKET_ECONNABORTED = -13
IOT_SOCKET_EALREADY = -14
IOT_SOCKET_EADDRINUSE = -15
IOT_SOCKET_EHOSTNOTFOUND = -16

IOT_SOCKET_MSG_PEEK = 1
IOT_SOCKET_MSG_DONTWAIT = 2
IOT_SOCKET_MSG_MORE = 4

IOT_SOCKET_MSG_TRUNC = 1
IOT_SOCKET_MSG_CTRUNC = 2

IOT_SOCKET_SHUTDOWN_RD = 1
IOT_SOCKET_SHUTDOWN_WR = 2
IOT_SOCKET_SHUTDOWN_RDWR = 3

PROTO = os.environ["PROTO"]
RPC_READ_SIZE = 1
LOG_TERMINAL = 0
RPC_TERMINAL = 1


# --- information about the test environment is contained in the env fixture

# access information in tests like this:
# env.fvp.ip <--- ip of the fvp (this is already ipv6 or ipv4 depending on test)
# env.host.ip <--- ip of the host (this python script)
# and so on, see class socket_info below to see what's available

FVP_SOCKET = 5555
HOST_SOCKET = 5556
HOST_IP4 = "10.200.1.2"
FVP_IP4 = "10.200.1.42"
HOST_IP6 = os.environ["BRIDGEIP"]
FVP_IP6 = "fe80::223:c1ff:fede:1"
HOST_ANY_IP4 = "0.0.0.0"
HOST_ANY_IP6 = "::"
FVP_ANY_IP4 = bytes([0, 0, 0, 0])
FVP_ANY_IP6 = bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

# Get the scope ID for the bridge
BRIDGEINTERFACE = os.environ["BRIDGEINTERFACE"]
SCOPE_ID = socket.getaddrinfo(
    f"{HOST_IP6}%{BRIDGEINTERFACE}", HOST_SOCKET, socket.AF_INET6, socket.SOCK_STREAM
)[0][4][3]


@dataclass
class socket_info_type:
    ip: str
    any_ip: str
    port: int
    af_type: int
    address: tuple


@dataclass
class env_info_type:
    index: int  # we need this for hashing
    host: socket_info_type
    fvp: socket_info_type

    def __hash__(self):  # needed for @dataclass to work (needs pod types to hash)
        return hash((self.index))


# the tuples below are indexed with is_ipv6
socket_info_host = (
    socket_info_type(
        HOST_IP4, HOST_ANY_IP4, HOST_SOCKET, socket.AF_INET, (HOST_IP4, HOST_SOCKET)
    ),
    socket_info_type(
        HOST_IP6,
        HOST_ANY_IP6,
        HOST_SOCKET,
        socket.AF_INET6,
        (HOST_IP6, HOST_SOCKET, 0, SCOPE_ID),
    ),
)

socket_info_fvp = (
    socket_info_type(
        FVP_IP4, FVP_ANY_IP4, FVP_SOCKET, IOT_SOCKET_AF_INET, (FVP_IP4, FVP_SOCKET)
    ),
    socket_info_type(
        FVP_IP6,
        FVP_ANY_IP6,
        FVP_SOCKET,
        IOT_SOCKET_AF_INET6,
        (FVP_IP6, FVP_SOCKET, 0, SCOPE_ID),
    ),
)

env_info = (
    env_info_type(0, socket_info_host[0], socket_info_fvp[0]),
    env_info_type(1, socket_info_host[1], socket_info_fvp[1]),
)


# --- Helper functions


def rpc_service_setup(context):
    context: TestCaseContext
    device = context.allocated_devices[0]
    device.channel.close()
    log_channel: SynchronousSocketChannel = device.controller.get_channel(
        terminal=LOG_TERMINAL, sync=True
    )
    log_channel.open()
    device.channel.close()
    rpc_channel = device.controller.get_channel(terminal=RPC_TERMINAL, sync=True)
    rpc_channel.open()
    client = HdlcRpcClient(
        partial(rpc_channel.read, RPC_READ_SIZE),
        [PROTO],
        default_channels(rpc_channel.write),
    )
    # Wait for the device to be properly initialized
    time.sleep(1)
    service = client.rpcs().iotsdk.socket.pw_rpc.IotSocketService
    return service


def socket_accept(recv_socket, service):
    if service is not None:
        status, response = service.Accept(socket=recv_socket)
        assert status.ok() and response.status >= 0
        return response.status, (response.ip, response.port)
    else:
        conn, addr = recv_socket.accept()
        return conn, addr


# init arguments are: socket, service (if the socket is a py socket set it to None)
class AcceptThread(threading.Thread):
    def __init__(self, *args):
        threading.Thread.__init__(self)
        self.args = args

    def run(self):
        self.socket, self.address = socket_accept(*self.args)


def socket_select(service, read_sockets, write_sockets, timeout_ms):
    status, response = service.Select(
        read_sockets=read_sockets, write_sockets=write_sockets, timeout_ms=timeout_ms
    )
    assert status.ok()
    return response.status, response.read_sockets, response.write_sockets


# init arguments are: service, read_sockets, write_sockets, timeout_ms
class SelectThread(threading.Thread):
    def __init__(self, *args):
        threading.Thread.__init__(self)
        self.args = args

    def run(self):
        self.status, self.read_sockets, self.write_sockets = socket_select(*self.args)


def connect_socket_fvp_to_host(
    service, fvp_socket, fvp_port, host_socket, host_port, so_type, is_ipv6
):
    env = env_info[is_ipv6]
    dest_ip = (
        socket.inet_pton(socket.AF_INET6, env.host.ip)
        if is_ipv6 == 1
        else socket.inet_aton(env.host.ip)
    )

    host_socket.bind((env.host.any_ip, host_port))
    if fvp_port is not None and fvp_port != 0:
        status, response = service.Bind(
            socket=fvp_socket,
            ip=env.fvp.any_ip,
            ip_len=len(env.fvp.any_ip),
            port=fvp_port,
        )
        assert status.ok() and response.status >= 0

    host_listen_socket = None

    if so_type == IOT_SOCKET_SOCK_STREAM:
        host_listen_socket = host_socket
        host_listen_socket.listen()
        accept_t = AcceptThread(host_listen_socket, None)
        accept_t.start()
        status, response = service.Connect(
            socket=fvp_socket, ip=dest_ip, ip_len=len(dest_ip), port=host_port
        )
        accept_t.join()
        host_socket = accept_t.socket
        assert status.ok() and response.status >= 0

    return host_socket, host_listen_socket


def connect_socket_host_to_fvp(
    service, host_socket, host_port, fvp_socket, fvp_port, so_type, is_ipv6
):
    env = env_info[is_ipv6]

    if fvp_port is not None and fvp_port != 0:
        host_socket.bind((env.host.any_ip, host_port))

    status, response = service.Bind(
        socket=fvp_socket, ip=env.fvp.any_ip, ip_len=len(env.fvp.any_ip), port=fvp_port
    )
    assert status.ok() and response.status >= 0

    if so_type == IOT_SOCKET_SOCK_STREAM:
        status, response = service.Listen(socket=fvp_socket, backlog=1)
        assert status.ok() and response.status >= 0
        accept_t = AcceptThread(fvp_socket, service)
        accept_t.start()
        host_socket.connect(env.fvp.address)
        accept_t.join()
        fvp_socket = accept_t.socket

    return fvp_socket


def send_data_fvp_to_host(service, data, fvp_socket, host_port, so_type, is_ipv6):
    env = env_info[is_ipv6]
    dest_ip = (
        socket.inet_pton(socket.AF_INET6, env.host.ip)
        if is_ipv6 == 1
        else socket.inet_aton(env.host.ip)
    )
    if so_type == IOT_SOCKET_SOCK_STREAM:
        status, response = service.Send(socket=fvp_socket, buf=data, len=len(data))
    else:
        status, response = service.SendTo(
            socket=fvp_socket,
            buf=data,
            len=len(data),
            ip=dest_ip,
            ip_len=len(dest_ip),
            port=host_port,
        )
    return status, response


def send_data_host_to_fvp(service, data, host_socket, fvp_port, so_type, is_ipv6):
    env = env_info[is_ipv6]
    if so_type == IOT_SOCKET_SOCK_STREAM:
        sent = host_socket.send(data)
    else:
        sent = host_socket.sendto(data, (env.fvp.ip, fvp_port))
    return sent


# --- pytest fixtures


@pytest.fixture
def convert_fvp_ip_into_host_ip(is_ipv6):
    def conv_ip6(ip):
        return socket.inet_ntop(socket.AF_INET6, ip)

    def conv_ip4(ip):
        return socket.inet_ntop(socket.AF_INET, ip)

    if is_ipv6 == 1:
        return conv_ip6
    else:
        return conv_ip4


@pytest.fixture
def convert_host_ip_into_fvp_ip(is_ipv6):
    def conv_ip6(ip):
        return socket.inet_pton(socket.AF_INET6, ip)

    def conv_ip4(ip):
        return socket.inet_aton(ip)

    if is_ipv6 == 1:
        return conv_ip6
    else:
        return conv_ip4


# returns 4 sockets: fvp_rcv, fvp_send, host_recv, host_send
@pytest.fixture
def get_socket_pairs(is_ipv6):
    def get_sockets_common(service, af_fvp, so_type_fvp, af_host):
        proto = (
            IOT_SOCKET_IPPROTO_UDP
            if so_type_fvp is IOT_SOCKET_SOCK_DGRAM
            else IOT_SOCKET_IPPROTO_TCP
        )
        so_type_host = (
            socket.SOCK_DGRAM
            if so_type_fvp is IOT_SOCKET_SOCK_DGRAM
            else socket.SOCK_STREAM
        )
        status, response = service.Create(af=af_fvp, type=so_type_fvp, protocol=proto)
        assert status.ok() and response.status >= 0
        status_send, response_send = service.Create(
            af=af_fvp, type=so_type_fvp, protocol=proto
        )
        assert status_send.ok() and response_send.status >= 0
        host_recv_socket = socket.socket(af_host, so_type_host)
        host_send_socket = socket.socket(af_host, so_type_host)
        host_recv_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return response.status, response_send.status, host_recv_socket, host_send_socket

    def get_sockets_ipv6(service, so_type_fvp):
        return get_sockets_common(
            service, IOT_SOCKET_AF_INET6, so_type_fvp, socket.AF_INET6
        )

    def get_sockets_ipv4(service, so_type_fvp):
        return get_sockets_common(
            service, IOT_SOCKET_AF_INET, so_type_fvp, socket.AF_INET
        )

    if is_ipv6 == 1:
        return get_sockets_ipv6
    else:
        return get_sockets_ipv4


@pytest.fixture
def get_socket_fvp(is_ipv6):
    def get_socket_common(service, af_fvp, so_type_fvp):
        proto = (
            IOT_SOCKET_IPPROTO_UDP
            if so_type_fvp is IOT_SOCKET_SOCK_DGRAM
            else IOT_SOCKET_IPPROTO_TCP
        )
        status, response = service.Create(af=af_fvp, type=so_type_fvp, protocol=proto)
        assert status.ok() and response.status >= 0
        return response.status

    def get_socket_ipv6(service, so_type_fvp):
        return get_socket_common(service, IOT_SOCKET_AF_INET6, so_type_fvp)

    def get_socket_ipv4(service, so_type_fvp):
        return get_socket_common(service, IOT_SOCKET_AF_INET, so_type_fvp)

    if is_ipv6 == 1:
        return get_socket_ipv6
    else:
        return get_socket_ipv4


@pytest.fixture
def get_socket_host(is_ipv6):
    def get_socket_ipv6(so_type_host):
        host_socket = socket.socket(socket.AF_INET6, so_type_host)
        host_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return host_socket

    def get_socket_ipv4(so_type_host):
        host_socket = socket.socket(socket.AF_INET, so_type_host)
        host_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return host_socket

    if is_ipv6 == 1:
        return get_socket_ipv6
    else:
        return get_socket_ipv4


@pytest.fixture
def host(is_ipv6):
    return socket_info_host[is_ipv6]


@pytest.fixture
def fvp(is_ipv6):
    return socket_info_fvp[is_ipv6]


@pytest.fixture
def env(is_ipv6):
    return env_info[is_ipv6]


HDR_FORMAT = "=============================== %s %s ==============================="


@pytest.mark.parametrize("is_ipv6", [(0), (1)])
@pytest.mark.parametrize("fixture_test_case", ["@test.json"], indirect=True)
class TestParametrized:
    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "so_type", [(IOT_SOCKET_SOCK_DGRAM), (IOT_SOCKET_SOCK_STREAM)]
    )
    async def test_echo_from_fvp_to_host(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_pairs,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            (
                recv_socket,
                send_socket,
                host_recv_socket,
                host_send_socket,
            ) = get_socket_pairs(service, so_type)
            try:
                hello = b"hello"

                host_recv_socket, host_listen_socket = connect_socket_fvp_to_host(
                    service,
                    send_socket,
                    0,
                    host_recv_socket,
                    env.host.port,
                    so_type,
                    is_ipv6,
                )
                status, response = send_data_fvp_to_host(
                    service, hello, send_socket, env.host.port, so_type, is_ipv6
                )
                assert status.ok() and response.status >= 0

                received_message, address = host_recv_socket.recvfrom(1024)

                recv_socket = connect_socket_host_to_fvp(
                    service,
                    host_send_socket,
                    0,
                    recv_socket,
                    env.fvp.port,
                    so_type,
                    is_ipv6,
                )
                send_data_host_to_fvp(
                    service,
                    received_message,
                    host_send_socket,
                    env.fvp.port,
                    so_type,
                    is_ipv6,
                )

                status, response = service.RecvFrom(socket=recv_socket, len=len(hello))
                assert status.ok() and response.status >= 0
                assert hello == response.buf

            finally:
                host_recv_socket.close()
                host_send_socket.close()
                if "host_listen_socket" in locals() and host_listen_socket is not None:
                    host_listen_socket.close()

    @pytest.mark.asyncio
    @pytest.mark.parametrize("so_type", [(IOT_SOCKET_SOCK_STREAM)])
    async def test_listen_accept_twice(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_pairs,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            (
                recv_socket,
                send_socket,
                host_send_socket,
                host_send_socket2,
            ) = get_socket_pairs(service, so_type)
            try:
                hello = b"hello"

                status, response = service.Bind(
                    socket=recv_socket,
                    ip=env.fvp.any_ip,
                    ip_len=len(env.fvp.any_ip),
                    port=env.fvp.port,
                )
                assert status.ok() and response.status >= 0

                status, response = service.Listen(socket=recv_socket, backlog=2)
                assert status.ok() and response.status >= 0

                accept_t = AcceptThread(recv_socket, service)
                accept_t.start()
                if is_ipv6 == 1:
                    host_send_socket.connect(env.fvp.address)
                else:
                    host_send_socket.connect(env.fvp.address)
                accept_t.join()

                accept_t2 = AcceptThread(recv_socket, service)
                accept_t2.start()
                if is_ipv6 == 1:
                    host_send_socket2.connect(env.fvp.address)
                else:
                    host_send_socket2.connect(env.fvp.address)
                accept_t2.join()

                accept_socket = accept_t.socket
                accept_socket2 = accept_t2.socket

                host_send_socket.send(hello)
                host_send_socket2.send(hello)

                status, response = service.RecvFrom(
                    socket=accept_socket, len=len(hello)
                )
                assert status.ok() and response.status >= 0
                assert hello == response.buf
                status, response = service.RecvFrom(
                    socket=accept_socket2, len=len(hello)
                )
                assert status.ok() and response.status >= 0
                assert hello == response.buf

            finally:
                host_send_socket.close()
                host_send_socket2.close()

    @pytest.mark.asyncio
    async def test_send_continuous(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_fvp,
        get_socket_host,
        env,
    ):
        logger.info(HDR_FORMAT, "IP6" if is_ipv6 == 1 else "IP4", "UDP & TCP")
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            send_socket_udp = get_socket_fvp(service, IOT_SOCKET_SOCK_DGRAM)
            send_socket_tcp = get_socket_fvp(service, IOT_SOCKET_SOCK_STREAM)
            host_recv_socket_udp = get_socket_host(socket.SOCK_DGRAM)
            host_listen_socket = get_socket_host(socket.SOCK_STREAM)
            try:
                hello = b"hello---" * 32
                byebye = b"byebye--" * 32

                host_recv_socket_udp.bind((env.host.any_ip, env.host.port))
                host_listen_socket.bind((env.host.any_ip, env.host.port + 1))

                dest_ip = convert_host_ip_into_fvp_ip(env.host.ip)

                host_listen_socket.listen()
                accept_t = AcceptThread(host_listen_socket, None)
                accept_t.start()
                status, response = service.Connect(
                    socket=send_socket_tcp,
                    ip=dest_ip,
                    ip_len=len(dest_ip),
                    port=env.host.port + 1,
                )
                assert status.ok() and response.status >= 0
                accept_t.join()
                host_recv_socket_tcp = accept_t.socket
                fvp_address = accept_t.address

                for step in range(50):
                    status, response = service.SendTo(
                        socket=send_socket_udp,
                        buf=byebye,
                        len=len(byebye),
                        ip=dest_ip,
                        ip_len=len(dest_ip),
                        port=env.host.port,
                    )
                    assert status.ok() and response.status >= 0
                    status, response = service.Send(
                        socket=send_socket_tcp, buf=hello, len=len(hello)
                    )
                    assert status.ok() and response.status >= 0
                    received_message, address = host_recv_socket_udp.recvfrom(1024)
                    assert received_message == byebye
                    received_message, address = host_recv_socket_tcp.recvfrom(1024)
                    assert received_message == hello

            finally:
                host_recv_socket_udp.close()
                host_listen_socket.close()
                if "host_recv_socket_tcp" in locals():
                    host_recv_socket_tcp.close()

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "so_type", [(IOT_SOCKET_SOCK_DGRAM), (IOT_SOCKET_SOCK_STREAM)]
    )
    async def test_get_sock_name(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_fvp,
        get_socket_host,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            fvp_socket = get_socket_fvp(service, so_type)

            status, response = service.GetSockName(socket=fvp_socket)
            assert response.status != 0

            status, response = service.Bind(
                socket=fvp_socket,
                ip=env.fvp.any_ip,
                ip_len=len(env.fvp.any_ip),
                port=env.fvp.port,
            )
            assert status.ok() and response.status >= 0

            status, response = service.GetSockName(socket=fvp_socket)
            assert status.ok() and response.status >= 0
            assert response.port == env.fvp.port
            assert response.ip == env.fvp.any_ip
            assert response.ip_len == len(env.fvp.any_ip)

    @pytest.mark.asyncio
    async def test_get_peer_name(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_fvp,
        get_socket_host,
        env,
    ):
        logger.info(HDR_FORMAT, "IP6" if is_ipv6 == 1 else "IP4", "TCP")
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            fvp_socket = get_socket_fvp(service, IOT_SOCKET_SOCK_STREAM)
            host_listen_socket = get_socket_host(socket.SOCK_STREAM)
            try:
                host_listen_socket.bind((env.host.any_ip, env.host.port))

                dest_ip = convert_host_ip_into_fvp_ip(env.host.ip)

                host_listen_socket.listen()
                accept_t = AcceptThread(host_listen_socket, None)
                accept_t.start()
                status, response = service.Connect(
                    socket=fvp_socket,
                    ip=dest_ip,
                    ip_len=len(dest_ip),
                    port=env.host.port,
                )
                assert status.ok() and response.status >= 0
                accept_t.join()
                host_recv_socket_tcp = accept_t.socket
                fvp_address = accept_t.address

                status, response = service.GetPeerName(socket=fvp_socket)
                assert status.ok() and response.status >= 0
                assert response.port == env.host.port
                assert response.ip == dest_ip
                assert response.ip_len == len(dest_ip)

            finally:
                host_listen_socket.close()
                if "host_recv_socket_tcp" in locals():
                    host_recv_socket_tcp.close()

    @pytest.mark.skip(reason="we need DHCP to work to get DNS server")
    @pytest.mark.asyncio
    async def test_get_host_by_name(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_fvp,
        get_socket_host,
        env,
    ):
        logger.info(HDR_FORMAT, "IP6" if is_ipv6 == 1 else "IP4", "")
        async with fixture_test_case as context:
            service = rpc_service_setup(context)

            name = b"arm.com"
            status, response = service.GetHostByName(
                name=name, name_len=len(name), af=env.fvp.af_type
            )
            assert status.ok() and response.status >= 0
            assert response.ip_len == len(env.fvp.any_ip)

    @pytest.mark.asyncio
    @pytest.mark.parametrize("so_type", [(IOT_SOCKET_SOCK_STREAM)])
    async def test_shutdown_and_close(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_pairs,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            (
                recv_socket,
                send_socket,
                host_recv_socket,
                host_send_socket,
            ) = get_socket_pairs(service, so_type)
            try:
                hello = b"hello"

                host_recv_socket, host_listen_socket = connect_socket_fvp_to_host(
                    service,
                    send_socket,
                    env.host.port,
                    host_recv_socket,
                    env.host.port,
                    so_type,
                    is_ipv6,
                )
                status, response = send_data_fvp_to_host(
                    service, hello, send_socket, env.host.port, so_type, is_ipv6
                )
                assert status.ok() and response.status >= 0
                received_message, address = host_recv_socket.recvfrom(1024)

                status, response = service.Shutdown(
                    socket=send_socket, option=IOT_SOCKET_SHUTDOWN_RD
                )
                assert status.ok() and response.status >= 0

                status, response = send_data_fvp_to_host(
                    service, hello, send_socket, env.host.port, so_type, is_ipv6
                )
                assert status.ok() and response.status >= 0
                received_message, address = host_recv_socket.recvfrom(1024)

                status, response = service.Shutdown(
                    socket=send_socket, option=IOT_SOCKET_SHUTDOWN_WR
                )
                assert status.ok() and response.status >= 0

                status, response = send_data_fvp_to_host(
                    service, hello, send_socket, env.host.port, so_type, is_ipv6
                )
                assert status.ok() and response.status < 0

                recv_socket = connect_socket_host_to_fvp(
                    service,
                    host_send_socket,
                    0,
                    recv_socket,
                    env.fvp.port,
                    so_type,
                    is_ipv6,
                )
                status = send_data_host_to_fvp(
                    service, hello, host_send_socket, env.fvp.port, so_type, is_ipv6
                )
                assert status >= 0
                status, response = service.RecvFrom(socket=recv_socket, len=len(hello))
                assert status.ok() and response.status >= 0

                status, response = service.Shutdown(
                    socket=recv_socket, option=IOT_SOCKET_SHUTDOWN_WR
                )
                assert status.ok() and response.status >= 0

                status = send_data_host_to_fvp(
                    service, hello, host_send_socket, env.fvp.port, so_type, is_ipv6
                )
                assert status >= 0
                status, response = service.RecvFrom(socket=recv_socket, len=len(hello))
                assert status.ok() and response.status >= 0

                status, response = service.Shutdown(
                    socket=recv_socket, option=IOT_SOCKET_SHUTDOWN_RD
                )
                assert status.ok() and response.status >= 0

                status = send_data_host_to_fvp(
                    service, hello, host_send_socket, env.fvp.port, so_type, is_ipv6
                )
                assert status >= 0
                status, response = service.RecvFrom(socket=recv_socket, len=len(hello))
                assert status.ok() and response.status < 0

            finally:
                host_recv_socket.close()
                host_send_socket.close()
                if "host_listen_socket" in locals() and host_listen_socket is not None:
                    host_listen_socket.close()

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "so_type", [(IOT_SOCKET_SOCK_DGRAM), (IOT_SOCKET_SOCK_STREAM)]
    )
    async def test_use_invalid_socket(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_fvp,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)

            status, response = service.RecvFrom(socket=1, len=1)
            assert status.ok() and response.status == IOT_SOCKET_ESOCK

            fvp_socket = get_socket_fvp(service, so_type)
            status, response = service.SetOpt(
                socket=fvp_socket,
                opt_id=IOT_SOCKET_IO_FIONBIO,
                val=bytes.fromhex("01000000"),
            )
            assert status.ok() and response.status >= 0

            status, response = service.RecvFrom(socket=fvp_socket, len=0)
            assert status.ok() and response.status == IOT_SOCKET_EAGAIN

            status, response = service.Close(socket=fvp_socket)
            assert status.ok() and response.status >= 0

            status, response = service.RecvFrom(socket=fvp_socket, len=1)
            assert status.ok() and response.status == IOT_SOCKET_ESOCK

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "so_type", [(IOT_SOCKET_SOCK_DGRAM), (IOT_SOCKET_SOCK_STREAM)]
    )
    async def test_socket_opts(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_fvp,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)

            status, response = service.GetOpt(socket=1, opt_id=IOT_SOCKET_SO_TYPE)
            assert status.ok() and response.status < 0

            fvp_socket = get_socket_fvp(service, so_type)

            status, response = service.SetOpt(
                socket=fvp_socket,
                opt_id=IOT_SOCKET_SO_KEEPALIVE,
                val=bytes.fromhex("01000000"),
            )
            assert status.ok() and response.status >= 0
            status, response = service.GetOpt(
                socket=fvp_socket, opt_id=IOT_SOCKET_SO_KEEPALIVE
            )
            assert status.ok() and response.status >= 0
            assert response.val != bytes.fromhex("00000000")

            status, response = service.SetOpt(
                socket=fvp_socket,
                opt_id=IOT_SOCKET_SO_KEEPALIVE,
                val=bytes.fromhex("00000000"),
            )
            assert status.ok() and response.status >= 0
            status, response = service.GetOpt(
                socket=fvp_socket, opt_id=IOT_SOCKET_SO_KEEPALIVE
            )
            assert status.ok() and response.status >= 0
            assert response.val == bytes.fromhex("00000000")

            status, response = service.Close(socket=fvp_socket)
            assert status.ok() and response.status >= 0

    @pytest.mark.asyncio
    @pytest.mark.parametrize(
        "so_type", [(IOT_SOCKET_SOCK_DGRAM), (IOT_SOCKET_SOCK_STREAM)]
    )
    async def test_select(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_pairs,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP" if so_type == 2 else "TCP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            (
                recv_socket,
                send_socket,
                host_recv_socket,
                host_send_socket,
            ) = get_socket_pairs(service, so_type)
            try:
                hello = b"hello"

                status, response = service.SetOpt(
                    socket=recv_socket,
                    opt_id=IOT_SOCKET_IO_FIONBIO,
                    val=bytes.fromhex("00000000"),
                )
                assert status.ok() and response.status >= 0
                status, response = service.SetOpt(
                    socket=send_socket,
                    opt_id=IOT_SOCKET_IO_FIONBIO,
                    val=bytes.fromhex("00000000"),
                )
                assert status.ok() and response.status >= 0

                host_recv_socket, host_listen_socket = connect_socket_fvp_to_host(
                    service,
                    send_socket,
                    0,
                    host_recv_socket,
                    env.host.port,
                    so_type,
                    is_ipv6,
                )
                recv_socket = connect_socket_host_to_fvp(
                    service,
                    host_send_socket,
                    0,
                    recv_socket,
                    env.fvp.port,
                    so_type,
                    is_ipv6,
                )

                read_sockets = [recv_socket, send_socket]
                write_sockets = [recv_socket, send_socket]
                select_t = SelectThread(service, read_sockets, write_sockets, 100)
                select_t.start()
                select_t.join()
                assert select_t.status == 2
                assert select_t.read_sockets == []
                assert (
                    recv_socket in select_t.write_sockets
                    and send_socket in select_t.write_sockets
                )

                read_sockets = [recv_socket, send_socket]
                write_sockets = []
                select_t = SelectThread(service, read_sockets, write_sockets, 100)
                select_t.start()
                select_t.join()
                assert select_t.status == 0
                assert select_t.read_sockets == []
                assert select_t.write_sockets == []

                read_sockets = [recv_socket, send_socket]
                write_sockets = []
                select_t = SelectThread(service, read_sockets, write_sockets, 10000)
                select_t.start()

                send_data_host_to_fvp(
                    service, hello, host_send_socket, env.fvp.port, so_type, is_ipv6
                )

                select_t.join()
                assert select_t.status == 1
                assert select_t.read_sockets == [recv_socket]
                assert select_t.write_sockets == []

                status, response = service.RecvFrom(socket=recv_socket, len=len(hello))
                assert status.ok() and response.status >= 0

            finally:
                host_recv_socket.close()
                host_send_socket.close()
                if "host_listen_socket" in locals() and host_listen_socket is not None:
                    host_listen_socket.close()

    @pytest.mark.asyncio
    @pytest.mark.parametrize("so_type", [(IOT_SOCKET_SOCK_DGRAM)])
    async def test_send_recv_msg(
        self,
        is_ipv6,
        fixture_test_case,  # noqa: F811
        logger,  # noqa: F811
        convert_fvp_ip_into_host_ip,
        convert_host_ip_into_fvp_ip,
        get_socket_pairs,
        so_type,
        env,
    ):
        logger.info(
            HDR_FORMAT,
            "IP6" if is_ipv6 == 1 else "IP4",
            "UDP",
        )
        async with fixture_test_case as context:
            service = rpc_service_setup(context)
            (
                recv_socket,
                send_socket,
                host_recv_socket,
                host_send_socket,
            ) = get_socket_pairs(service, so_type)
            try:
                hello = b"hello"

                host_recv_socket.bind((env.host.any_ip, env.host.port))

                len_bytes = (len(hello)).to_bytes(4, byteorder="big")
                msg_iov_bytes = len_bytes + hello

                dest_ip = (
                    socket.inet_pton(socket.AF_INET6, env.host.ip)
                    if is_ipv6 == 1
                    else socket.inet_aton(env.host.ip)
                )

                status, response = service.SendMsg(
                    socket=send_socket,
                    ip=dest_ip,
                    port=env.host.port,
                    msg_iov=msg_iov_bytes,
                    msg_iovlen=1,
                    msg_control=b"",
                    msg_controllen=0,
                    flags=0,
                )

                assert status.ok() and response.status >= 0

                received_message, address = host_recv_socket.recvfrom(1024)
                assert received_message == hello

                status, response = service.Bind(
                    socket=recv_socket,
                    ip=env.fvp.any_ip,
                    ip_len=len(env.fvp.any_ip),
                    port=env.fvp.port,
                )

                host_send_socket.sendto(hello, (env.fvp.ip, env.fvp.port))

                status, response = service.RecvMsg(socket=recv_socket, flags=0)
                assert status.ok() and response.status >= 0
                # first 4 bytes is big endian uint32_t encoded length
                length = int.from_bytes(response.msg_iov[:4], "big")
                assert length == len(hello)
                assert response.msg_iov[4:] == hello
                assert response.msg_iovlen == 1

            finally:
                host_recv_socket.close()
                host_send_socket.close()
